/**
 * FileName: StructuredStreamingHDFS3
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/02/14 11:35
 * Description: 读取来自HDFS数据，简单处理后，并将数据存储到HDFS中
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;

import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.SingleColum2MutiProcessImpl;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Map;


public class StructuredStreamingHDFS3 {


	private static final String SOURCE_PATH="/test_szj/source_data";
	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet7";

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("MyHDFASTestApp2")
//				.config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
//				.config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
				.getOrCreate();

		//读取来自HDFS数据
		Dataset<Row> HDFSDataset = spark
				.readStream()
				.option("latestFirst","false")
				.option("fileNameOnly","false")
				.text(SOURCE_PATH);

		System.out.println("===============================>kafka："+HDFSDataset.columns().length);
		Dataset<Row> dataset = ProcessChain
				.setSourceData(HDFSDataset)
				//.addProcess(new RuleActionProcessImpl())
				.addProcess(new SingleColum2MutiProcessImpl())
				.execute();

		spark.conf().set("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP));
		spark.conf().set("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT));

		spark.read()
				.format("org.apache.spark.sql.redis")
				.option("table", "Xdata")
				.load()
				.registerTempTable("redis");

		Map<String, String> all = spark.conf().getAll();
		Iterator<Tuple2<String, String>> iterator = all.iterator();
		while (iterator.hasNext()){
			Tuple2<String, String> next = iterator.next();
			System.out.println("5==============================>key:"+next._1+" value:"+next._2);
		}

		dataset.createOrReplaceTempView("hdfs");
		dataset = spark.sql("select phone,RID,mergeRecord,URL from hdfs where url != 'null' and phone in(select key from redis)");

		String s="select * from kafka join redis on phone =key";
//		Dataset<Row> count = HDFSDataset.selectExpr("value")
//				.as(Encoders.STRING())
//				.map((MapFunction<String, String>) x -> {
//					String[] split = x.split("\\|");
//					return split[0];
//				}, Encoders.STRING())
//				.groupBy("value")
//				.avg()


//		dataset.createOrReplaceTempView("tmpView");
//
//		Dataset<Row> sql = spark.sql("select phone from  tmpView");

		//输出到控制台
		StreamingQuery query = dataset
				//.select(col("phone"),col("userAgent"),col("URL"))
				//.select(count("phone"))
				.writeStream()
				.format("console")
				.outputMode(OutputMode.Append())
				.option("truncate", "false")
				//.trigger(Trigger.ProcessingTime(30))
				.start();

		//保持程序等待
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}